-
Notifications
You must be signed in to change notification settings - Fork 67
feat: Add time_to_first_record
metric and adjusted performance stats
#826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add time_to_first_record
metric and adjusted performance stats
#826
Conversation
- Track time_to_first_record per stream (time from stream start to first record) - Add records_per_second_adjusted and mb_per_second_adjusted metrics - Adjusted metrics only calculated when time_to_first_record > 10 seconds - Adjusted metrics use first-record time instead of stream-start time - Helps distinguish connector initialization slowness from data processing speed Co-Authored-By: AJ Steers <[email protected]>
Original prompt from AJ Steers
|
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1760042608-time-to-first-record-tracking' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1760042608-time-to-first-record-tracking' Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
📝 WalkthroughWalkthroughAdds per-stream tracking of time-to-first-record and introduces adjusted throughput metrics when the first-record delay exceeds a new threshold. Extends ProgressTracker with a new constant, state, and method; updates record tallying and logging to compute and include adjusted MB/s and related metrics conditionally. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant SRC as Source Connector
participant PT as ProgressTracker
participant LOG as Logger
rect rgb(235, 245, 255)
note right of SRC: Read loop emits records per stream
SRC->>PT: tally_records_read(stream, count, bytes)
alt first record for stream
PT->>PT: stream_first_record_times[stream] = now
end
end
PT->>PT: _log_read_metrics()
PT->>PT: Compute time_to_first_record per stream
alt time_to_first_record > THRESHOLD and timestamps exist
PT->>PT: _calculate_adjusted_metrics(stream, count, tffr, mb_read)
PT->>LOG: Emit stream_metrics with adjusted throughput
else
PT->>LOG: Emit stream_metrics without adjusted throughput
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/progress.py (1)
561-579
: Fix UnboundLocalError when bytes tracking is disabled.At line 577,
mb_read
is passed to_calculate_adjusted_metrics
, butmb_read
is only defined within theif self.bytes_tracking_enabled:
block at lines 570-573. When bytes tracking is disabled, this will raise anUnboundLocalError
.Apply this diff to fix the issue:
stream_metrics[stream_name]["records_per_second"] = round( count / ( self.stream_read_end_times[stream_name] - self.stream_read_start_times[stream_name] ), 4, ) + mb_read = 0.0 if self.bytes_tracking_enabled: mb_read = self.stream_bytes_read[stream_name] / 1_000_000 stream_metrics[stream_name]["mb_read"] = mb_read stream_metrics[stream_name]["mb_per_second"] = round(mb_read / duration, 4) if time_to_first_record is not None: adjusted_metrics = self._calculate_adjusted_metrics( stream_name, count, time_to_first_record, mb_read ) stream_metrics[stream_name].update(adjusted_metrics)
This ensures
mb_read
is always defined before being passed to_calculate_adjusted_metrics
. The method already checksself.bytes_tracking_enabled
internally before usingmb_read
, so passing0.0
when bytes tracking is disabled is safe.
🧹 Nitpick comments (1)
airbyte/progress.py (1)
480-506
: Consider simplifying the validation logic, wdyt?The checks at lines 491-492 are redundant since the caller at lines 552 and 542-543 already verifies these conditions before calling this method. If these are intended as defensive guards, that's fine, but documenting them as such would help clarify the intent.
You could simplify to:
def _calculate_adjusted_metrics( self, stream_name: str, count: int, time_to_first_record: float, mb_read: float, ) -> dict[str, float]: - """Calculate adjusted performance metrics when time_to_first_record exceeds threshold.""" + """Calculate adjusted performance metrics when time_to_first_record exceeds threshold. + + Preconditions (validated by caller): + - stream_name exists in stream_first_record_times + - stream_name exists in stream_read_end_times + - time_to_first_record > TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS + """ adjusted_metrics = {} - if ( - time_to_first_record > TIME_TO_FIRST_RECORD_THRESHOLD_SECONDS - and stream_name in self.stream_first_record_times - and stream_name in self.stream_read_end_times - ): - adjusted_duration = ( - self.stream_read_end_times[stream_name] - - self.stream_first_record_times[stream_name] - ) - if adjusted_duration > 0: - adjusted_metrics["records_per_second_adjusted"] = round( - count / adjusted_duration, 4 - ) - if self.bytes_tracking_enabled: - adjusted_metrics["mb_per_second_adjusted"] = round( - mb_read / adjusted_duration, 4 - ) + adjusted_duration = ( + self.stream_read_end_times[stream_name] + - self.stream_first_record_times[stream_name] + ) + if adjusted_duration > 0: + adjusted_metrics["records_per_second_adjusted"] = round( + count / adjusted_duration, 4 + ) + if self.bytes_tracking_enabled: + adjusted_metrics["mb_per_second_adjusted"] = round( + mb_read / adjusted_duration, 4 + ) return adjusted_metrics
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/progress.py
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (4)
airbyte/progress.py (4)
108-109
: Nice addition!The constant is well-named and clearly documents its purpose. The 10-second threshold seems reasonable for distinguishing initialization overhead from steady-state throughput.
197-197
: LGTM!The new tracking dictionary follows the established pattern and fits naturally alongside
stream_read_start_times
andstream_read_end_times
.
285-286
: Solid implementation!The logic correctly captures the timestamp of the first record for each stream without overwriting subsequent records. The placement after the stream start check is appropriate.
540-549
: Clean implementation!The computation correctly handles the case where timestamps might be missing by setting
time_to_first_record
toNone
, which is then properly checked before calculating adjusted metrics.
time_to_first_record
metric and adjusted performance stats
feat: Add time_to_first_record metric and adjusted performance stats
Summary
Adds
time_to_first_record
tracking per stream to help distinguish between connector initialization latency and actual data processing speed. When the time to first record exceeds 10 seconds, additional adjusted throughput metrics (records_per_second_adjusted
,mb_per_second_adjusted
) are calculated using the first-record timestamp instead of the stream-start timestamp.Key changes:
stream_first_record_times
dict alongside existingstream_read_start_times
time_to_first_record
= first_record_time - stream_start_timerecords_per_second
,mb_per_second
) remain unchangedExample output:
Review & Testing Checklist for Human
time_to_first_record
values make sense and adjusted metrics only appear when threshold exceededNotes
Requested by AJ Steers (@aaronsteers) in Devin session: https://app.devin.ai/sessions/c77d07731aad483da4af06da3058d26e
The 10-second threshold was chosen as the point where initialization delay becomes significant enough to warrant adjusted throughput calculations. All existing unit tests pass, but this PR would benefit from integration testing with real connectors to validate the timing logic works correctly in practice.
Summary by CodeRabbit
New Features
Improvements
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.
Note
Auto-merge may have been disabled. Please check the PR status to confirm.